import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";

# Flink 快速开始

<!--
SPDX-FileCopyrightText: 2023 LakeSoul Contributors

SPDX-License-Identifier: Apache-2.0
-->

  
## 版本支持

| LakeSoul                   | Flink                                                    |
|----------------------------|----------------------------------------------------------|
 3.0.x+       | 1.20
 2.4.x+       | 1.17
 2.1.x-2.3.x | 1.14    

## PG配置

添加如下配置到 `$FLINK_HOME/conf/flink-conf.yaml`:
```yaml
containerized.master.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
containerized.master.env.LAKESOUL_PG_USERNAME: root
containerized.master.env.LAKESOUL_PG_PASSWORD: root
containerized.master.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
containerized.taskmanager.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
containerized.taskmanager.env.LAKESOUL_PG_USERNAME: root
containerized.taskmanager.env.LAKESOUL_PG_PASSWORD: root
containerized.taskmanager.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
```
注意master和taskmanager都需要设置

:::tip
Postgres 数据库的连接信息、用户名密码需要根据实际情况修改。
:::

:::caution
注意如果使用 Session 模式来启动作业，即将作业以 client 方式提交到 Flink Standalone Cluster，则 `flink run` 作为 client，是不会读取上面配置，因此需要再单独配置环境变量，即：

```bash
export LAKESOUL_PG_DRIVER=com.lakesoul.shaded.org.postgresql.Driver
export LAKESOUL_PG_URL=jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
export LAKESOUL_PG_USERNAME=root
export LAKESOUL_PG_PASSWORD=root
````
:::

## SQL
### 下载LakeSoul Flink Jar
可以在 LakeSoul Release 页面下载: https://github.com/lakesoul-io/LakeSoul/releases/download/vVAR::VERSION/lakesoul-flink-1.20-VAR::VERSION.jar.
如果访问 Github 有问题，也可以通过这个链接下载：https://mirrors.huaweicloud.com/repository/maven/com/dmetasoul/lakesoul-flink/1.20-VAR::VERSION/lakesoul-flink-1.20-VAR::VERSION.jar

### 使用SQL Client
```bash
# Start Flink SQL Client
bin/sql-client.sh embedded -j lakesoul-flink-1.20-VAR::VERSION.jar
```
## 创建表

<Tabs
    defaultValue="SQL"
    values={[
        {label: 'Java', value: 'Java'},
        {label: 'SQL', value: 'SQL'},
    ]}>
<TabItem value="Java">

   ```java
   TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
   String createUserSql = "create table user_info (" +
           "`id` INT," +
           "name STRING," +
           "score INT," +
           "`date` STRING," +
           "region STRING," +
            " PRIMARY KEY (`id`,`name`) NOT ENFORCED"+
           ") PARTITIONED BY (`region`,`date`)"+
            " WITH (" +
           " 'connector'='lakesoul'," +
           " 'hashBucketNum'='4'," +
           " 'use_cdc'='true'," +
           " 'path'='/tmp/lakesoul/flink/sink/test' )";
   tEnv. executeSql(createUserSql);
   ```

</TabItem>
<TabItem value="SQL">

   ```sql
   -- Create the test_table table, use id and name as the joint primary key, use region and date as the two-level range partition, catalog is lakesoul, and database is default
   create table `lakesoul`.`default`.test_table (
               `id` INT,
               name STRING,
               score INT,
               `date` STRING,
               region STRING,
           PRIMARY KEY (`id`,`name`) NOT ENFORCED
           ) PARTITIONED BY (`region`,`date`)
           WITH (
               'connector'='lakesoul',
               'hashBucketNum'='4',
               'use_cdc'='true',
               'path'='file:///tmp/lakesoul/flink/sink/test');
   ```

</TabItem>
</Tabs>


:::tip
建表语句中各个部分参数含义：

| 参数           | 含义说明                                                                                  | 参数填写格式                                  |
| -------------- | ----------------------------------------------------------------------------------------- | --------------------------------------------- |
| PARTITIONED BY | 用于指定表的 Range 分区字段，如果不存在 range 分区字段，则省略                              | PARTITIONED BY (`date`)                       |
| PRIMARY KEY | 用于指定表的主键，可以包含多个列                              | PARIMARY KEY (`id`, `name`) NOT ENFORCED                       |
| connector      | 数据源连接器，用于指定数据源类型                                                          | 'connector'='lakesoul'                        |
| hashBucketNum      | 有主键表必须设置哈希分片数                                                          | 'hashBucketNum'='4'                        |
| path           | 用于指定表的存储路径                                                                      | 'path'='file:///tmp/lakesoul/flink/sink/test' |
| use_cdc        | 设置表是否为 CDC 格式 (参考 [CDC 表格式](../03-Usage%20Docs/04-cdc-ingestion-table.mdx) ) | 'use_cdc'='true'                              |

:::
## 删除表

<Tabs
    defaultValue="SQL"
    values={[
        {label: 'Java', value: 'Java'},
        {label: 'SQL', value: 'SQL'},
    ]}>
<TabItem value="Java">

   ```java
   tEnvs.executeSql("DROP TABLE if exists test_table");
   ```

</TabItem>
<TabItem value="SQL">

   ```sql
   DROP TABLE if exists test_table;
   ```

</TabItem>
</Tabs>

## 插入数据

<Tabs
    defaultValue="SQL"
    values={[
        {label: 'Java', value: 'Java'},
        {label: 'SQL', value: 'SQL'},
    ]}>
<TabItem value="Java">

   ```java
   tEnvs.executeSql("insert into `lakesoul`.`default`.test_table values (1, 'AAA', 98, '2023-05-10', 'China')"). await();
   ```

</TabItem>
<TabItem value="SQL">

   批式：直接插入数据
    ```sql
    insert into `lakesoul`.`default`.test_table values (1,'AAA', 98, '2023-05-10', 'China');
    ```
   流式：构建流式任务，从另一个流式数据源中读取数据并写入到 LakeSoul 表中。如果上游数据是 CDC 的格式，则目标写入的 LakeSoul 表也需要设置为 CDC 表。
    ```sql
    -- 表示将`lakesoul`.`cdcsink`.soure_table表中的全部数据，插入到lakesoul`.`default`.test_table
    insert into `lakesoul`.`default`.test_table select * from `lakesoul`.`cdcsink`.soure_table;
    ```
</TabItem>
</Tabs>

:::caution
1. 对流写入，需要设置 checkpoint 间隔，建议为 1 分钟以上；
3. 根据环境设置相应的时区：

```sql
SET 'table.local-time-zone' = 'Asia/Shanghai';
-- 设置 checkpointing 时间间隔
SET 'execution.checkpointing.interval' = '2min';
```
:::

## 更新数据

<Tabs
    defaultValue="SQL"
    values={[
        {label: 'Java', value: 'Java'},
        {label: 'SQL', value: 'SQL'},
    ]}>
<TabItem value="Java">

   ```java
   tEnvs.executeSql("UPDATE `lakesoul`.`default`.test_table set score = 100 where id = 1") await();
   ```

</TabItem>
<TabItem value="SQL">

   ```sql
   UPDATE `lakesoul`.`default`.test_table set score = 100 where id = 1;
   ```
</TabItem>
</Tabs>

:::caution
注意 `update` 情况下，不允许更新主键、分区列的值。
对于流的执行模式，LakeSoul 已经能够支持 ChangeLog 语义，可以支持增删改。
:::
## Delete Data

<Tabs
    defaultValue="SQL"
    values={[
        {label: 'Java', value: 'Java'},
        {label: 'SQL', value: 'SQL'},
    ]}>
<TabItem value="Java">

   ```java
   tEnvs.executeSql("DELETE FROM `lakesoul`.`default`.test_table where id = 1") await();
   ```

</TabItem>
<TabItem value="SQL">

   ```sql
   DELETE FROM `lakesoul`.`default`.test_table where id = 1;
   ```
</TabItem>
</Tabs>

:::caution
`delete` 情况下，不允许条件中带有分区列。
对于流的执行模式，LakeSoul 已经能够支持 ChangeLog 语义，可以支持增删改。
:::
## 查询数据
### 全量读

<Tabs
    defaultValue="SQL"
    values={[
        {label: 'Java', value: 'Java'},
        {label: 'SQL', value: 'SQL'},
    ]}>
<TabItem value="Java">

   ```java
   // Create a batch execution environment
   tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table where region='China' and `date`='2023-05-10'").print();
   ```

</TabItem>
<TabItem value="SQL">

   ```sql
   SELECT * FROM `lakesoul`.`default`.test_table where region='China' and `date`='2023-05-10';
   ```
</TabItem>
</Tabs>

### 快照读
LakeSoul 支持对表执行快照读取，用户通过指定分区信息和结束时间戳，可以查询结束时间戳之前的所有数据。
<Tabs
    defaultValue="SQL"
    values={[
        {label: 'Java', value: 'Java'},
        {label: 'SQL', value: 'SQL'},
    ]}>
<TabItem value="Java">

   ```java
   tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='snapshot', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China'").print();
   ```

</TabItem>
<TabItem value="SQL">

   ```sql
   -- Execute snapshot read of test_table in the region=China partition, the end timestamp of the read is 2023-05-01 15:20:15, and the time zone is Asia/Shanghai
   SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='snapshot', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China';
   ```
</TabItem>
</Tabs>

### 增量范围读
LakeSoul 支持对表执行范围增量读取，用户通过指定分区信息和起始时间戳、结束时间戳，可以查询这一时间范围内的增量数据。
<Tabs
    defaultValue="SQL"
    values={[
        {label: 'Java', value: 'Java'},
        {label: 'SQL', value: 'SQL'},
    ]}>
<TabItem value="Java">

  ```java
   tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='incremental'，'readstarttime'='2023-05-01 15:15:15 ', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China'").print();
  ```

</TabItem>
<TabItem value="SQL">

    ```sql
    -- Incremental reading of test_table in the region=China partition, the read timestamp range is 2023-05-01 15:15:15 to 2023-05-01 15:20:15, and the time zone is Asia/Shanghai
    SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('readtype'='incremental', 'readstarttime'='2023-05-01 15:15:15 ', 'readendtime'='2023-05-01 15:20:15', 'timezone'='Asia/Shanghai')*/ WHERE region='China';
    ```
</TabItem>
</Tabs>

### 流读
LakeSoul 表支持在 Flink 执行流式读取，流式读基于增量读，用户通过指定起始时间戳和分区信息，可以连续不间断读取自起始时间戳以后的新增数据。若是不设置起始时间戳则从第一条数据读

<Tabs
    defaultValue="SQL"
    values={[
        {label: 'Java', value: 'Java'},
        {label: 'SQL', value: 'SQL'},
    ]}>
<TabItem value="Java">

    ```java
     tEnvs.executeSql("SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('timezone'='Asia/Shanghai')*/ WHERE region='China'").print();
    ```

</TabItem>
<TabItem value="SQL">

  ```sql
  -- Incremental reading of test_table in the region=China partition, the time zone is Asia/Shanghai
  SELECT * FROM `lakesoul`.`default`.test_table /*+ OPTIONS('timezone'='Asia/Shanghai')*/ WHERE region='China';
  ```
</TabItem>
</Tabs>

在流式读取时，LakeSoul 完整支持 Flink Changelog Stream 语义。对于 LakeSoul CDC 表，增量读取的结果仍然为 CDC 格式，即包含了 `insert`，`update`，`delete` 事件，这些事件会自动转为 Flink RowData 的 RowKind 字段的对应值，从而在 Flink 中实现了全链路的增量计算。

### Lookup Join
LakeSoul 表支持 Flink SQL 中的 Lookup Join 操作。Lookup Join 会将待 Join 的右表缓存在内存中，从而大幅提升 Join 速度，可以在较小维表关联的场景中使用以提升性能。
LakeSoul 默认每隔 60 秒会尝试刷新缓存，这个间隔可以通过在创建维表时设置 `'lookup.join.cache.ttl'='60s'` 表属性来修改。

```sql
CREATE TABLE `lakesoul`.`default`.customers (
            `c_id` INT,
            `name` STRING,
        PRIMARY KEY (`c_id`) NOT ENFORCED)
        WITH (
            'connector'='lakesoul',
            'hashBucketNum'='1',
            'path'='file:///tmp/lakesoul/flink/sink/customers'
            );  
CREATE TABLE `lakesoul`.`default`.orders (
            `o_id` INT,
            `o_c_id` INT,
        PRIMARY KEY (`o_id`) NOT ENFORCED)
        WITH (
            'connector'='lakesoul',
            'hashBucketNum'='1',
            'path'='file:///tmp/lakesoul/flink/sink/orders',
            'lookup.join.cache.ttl'='60s'
            );  
SELECT `o_id`, `c_id`, `name`
FROM
(SELECT *, proctime() as proctime FROM `lakesoul`.`default`.orders) as o
JOIN `lakesoul`.`default`.customers FOR SYSTEM_TIME AS OF o.proctime
ON c_id = o_cid;
```
Orders 表需要与 Customers 表的数据进行 Lookup Join。带有后续 process time 属性的 FOR SYSTEM_TIME AS OF 子句确保在联接运算符处理 Orders 行时，Orders 的每一行都与 join 条件匹配的 Customer 行连接。它还防止连接的 Customer 表在未来发生更新时变更连接结果。lookup join 还需要一个强制的相等连接条件，在上面的示例中是 o_c_id = c_id

:::tip
支持Flink按批式和流式读取lakesoul表，在Flink SQLClient客户端执行命令，切换流式和批式的执行模式。
```sql
-- 按照流式执行Flink任务
SET execution.runtime-mode = streaming;
SET 'execution.checkpointing.interval' = '1min';
-- 按照批式执行Flink任务
SET execution.runtime-mode = batch;
```

使用 Flink SQL，指定条件查询的格式为 `SELECT * FROM test_table /*+ OPTIONS('key'='value')*/ WHERE partition=xxx` 。在任何一种读的模式下，分区可以指定，也可以不指定，也可以只指定一部分分区值，LakeSoul 会自动匹配满足条件的分区。

其中 `/* OPTIONS() */` 为查询选项（hints），必须要直接跟在表名的后面（在 where 等其他子句的前面），LakeSoul 读取时的 hint 选项包括：

| 参数              | 含义说明                                        | 参数填写格式                          |
| ----------------- |---------------------------------------------| ------------------------------------- |
| readtype          | 读类型，可以指定增量读incremental，快照读snapshot，不指定默认全量读 | 'readtype'='incremental'              |
| discoveryinterval | 流式增量读的发现新数据时间间隔，单位毫秒，默认为 30000              | 'discoveryinterval'='10000'           |
| readstarttime     | 起始读时间戳，如果未指定起始时间戳，则默认从起始版本号开始读取             | 'readstarttime'='2023-05-01 15:15:15' |
| readendtime       | 结束读时间戳，如果未指定结束时间戳，则默认读取到当前最新版本号             | 'readendtime'='2023-05-01 15:20:15'   |
| timezone          | 时间戳的时区信息，如果不指定时间戳的时区信息，则默认为按本机时区处理          | 'timezone'='Asia/Sahanghai'           |
:::
